ClarkTech Expansion — Implementation

Purpose
- Provide concrete implementation guidance and a minimal prototype for the Hypergraph Engine and in-memory patterns described in the Design doc.

Overview
- This document contains:
  - Redis keyspace and stream examples
  - A minimal `HypergraphEngine` Python prototype suitable for single-process use and unit testing
  - Example usage and basic tests

Redis Keyspace Suggestions
- `sessions:{session_token}` -> hash (operator_id, expires_at, last_heartbeat)
- `room:members:{room_id}` -> set(session_id)
- `entity:{entity_id}` -> hash (entity_type, data)
- `room:entities:{room_id}` -> hash of entity_id -> json
- `events:stream` -> Redis Stream (append-only)
- `spatial:index` -> optional external service or serialized KD-tree metadata

Prototype: HypergraphEngine (minimal)

```python
# Minimal HypergraphEngine prototype
from collections import defaultdict
from dataclasses import dataclass, asdict
from typing import Dict, Any, Set, Optional, List
import time
import uuid
import threading

@dataclass
class HGNode:
    id: str
    kind: str
    position: Optional[List[float]] = None
    frequency: Optional[float] = None
    labels: Dict[str, Any] = None
    metadata: Dict[str, Any] = None
    created_at: float = None
    updated_at: float = None

@dataclass
class HGEdge:
    id: str
    kind: str
    nodes: Set[str]
    weight: float = 1.0
    labels: Dict[str, Any] = None
    metadata: Dict[str, Any] = None
    created_at: float = None
    updated_at: float = None

class HypergraphEngine:
    def __init__(self):
        self.lock = threading.RLock()
        self.nodes: Dict[str, HGNode] = {}
        self.edges: Dict[str, HGEdge] = {}
        self.node_to_edges: Dict[str, Set[str]] = defaultdict(set)
        self.type_index: Dict[str, Set[str]] = defaultdict(set)
        self.service_index: Dict[str, Set[str]] = defaultdict(set)

    def _now(self):
        return time.time()

    def add_node(self, node_id: Optional[str], kind: str, **kwargs) -> HGNode:
        with self.lock:
            nid = node_id or str(uuid.uuid4())
            now = self._now()
            node = HGNode(id=nid, kind=kind, created_at=now, updated_at=now,
                          labels=kwargs.get('labels') or {},
                          metadata=kwargs.get('metadata') or {},
                          position=kwargs.get('position'),
                          frequency=kwargs.get('frequency'))
            self.nodes[nid] = node
            self.type_index[kind].add(nid)
            # service label index
            services = node.labels.get('service') or []
            for s in services:
                self.service_index[s].add(nid)
            return node

    def remove_node(self, node_id: str) -> bool:
        with self.lock:
            if node_id not in self.nodes:
                return False
            # remove related edges
            to_remove = list(self.node_to_edges.get(node_id, set()))
            for eid in to_remove:
                self.remove_edge(eid)
            node = self.nodes.pop(node_id)
            self.type_index[node.kind].discard(node_id)
            services = (node.labels or {}).get('service') or []
            for s in services:
                self.service_index[s].discard(node_id)
            self.node_to_edges.pop(node_id, None)
            return True

    def add_edge(self, edge_id: Optional[str], kind: str, nodes: List[str], **kwargs) -> HGEdge:
        with self.lock:
            eid = edge_id or str(uuid.uuid4())
            now = self._now()
            node_set = set(nodes)
            edge = HGEdge(id=eid, kind=kind, nodes=node_set,
                          labels=kwargs.get('labels') or {},
                          metadata=kwargs.get('metadata') or {},
                          created_at=now, updated_at=now)
            self.edges[eid] = edge
            for n in node_set:
                self.node_to_edges[n].add(eid)
            return edge

    def remove_edge(self, edge_id: str) -> bool:
        with self.lock:
            if edge_id not in self.edges:
                return False
            edge = self.edges.pop(edge_id)
            for n in edge.nodes:
                self.node_to_edges.get(n, set()).discard(edge_id)
            return True

    def get_node(self, node_id: str) -> Optional[HGNode]:
        return self.nodes.get(node_id)

    def get_edge(self, edge_id: str) -> Optional[HGEdge]:
        return self.edges.get(edge_id)

    def neighbors(self, node_id: str) -> Set[str]:
        with self.lock:
            eids = self.node_to_edges.get(node_id, set())
            neigh = set()
            for eid in eids:
                edge = self.edges.get(eid)
                if edge:
                    neigh.update(edge.nodes)
            neigh.discard(node_id)
            return neigh

    def nodes_by_type(self, kind: str) -> Set[str]:
        return set(self.type_index.get(kind, set()))

    def to_dict(self) -> Dict[str, Any]:
        return {
            'nodes': {k: asdict(v) for k, v in self.nodes.items()},
            'edges': {k: {'id': v.id, 'kind': v.kind, 'nodes': list(v.nodes)} for k, v in self.edges.items()}
        }
```

Example usage

```python
engine = HypergraphEngine()
node = engine.add_node(None, 'vessel', position=[37.7, -122.4], labels={'service': ['AIS']})
host = engine.add_node(None, 'network_host', position=[37.7, -122.4], labels={'service': ['http']})
edge = engine.add_edge(None, 'co_location', [node.id, host.id])
assert host.id in engine.neighbors(node.id)
```

Basic testing ideas
- Unit tests for add/remove node/edge
- Tests for index consistency after operations
- Integration test: snapshot → reload → replay events → verify equivalence

Integration suggestions
- Provide adapter that subscribes to Redis Streams and applies events to the HypergraphEngine
- Snapshot engine state to JSON and persist periodically; on restart, reload JSON then replay stream tail
- Use Redis Streams consumer groups for write-behind persistence worker(s)

Next steps
- If you approve, I will integrate this prototype into the repository under `NerfEngine/hypergraph_engine.py` and add unit tests.

# create a mission (if you don't have one)
curl -s -X POST http://localhost:8080/api/missions -H "Content-Type: application/json" -d '{"name":"test mission"}' | jq

# create a task for mission_id returned above
curl -s -X POST http://localhost:8080/api/missions/<MISSION_ID>/tasks -H "Content-Type: application/json" -d '{"title":"Investigate target","priority":3,"payload":{"entity_id":"ENTITY-0001"}}' | jq

# list tasks
curl -s http://localhost:8080/api/missions/<MISSION_ID>/tasks | jq

# patch a task
curl -s -X PATCH http://localhost:8080/api/missions/<MISSION_ID>/tasks/<TASK_ID> -H "Content-Type: application/json" -d '{"status":"IN_PROGRESS"}' | jq

# delete a task
curl -s -X DELETE http://localhost:8080/api/missions/<MISSION_ID>/tasks/<TASK_ID> | jq

# add watchlist entry
curl -s -X POST http://localhost:8080/api/missions/<MISSION_ID>/watchlist -H "Content-Type: application/json" -d '{"entity_id":"drone-1","note":"monitor comms"}' | jq

# list watchlist
curl -s http://localhost:8080/api/missions/<MISSION_ID>/watchlist | jq

# delete watchlist entry (use returned id)
curl -s -X DELETE http://localhost:8080/api/missions/<MISSION_ID>/watchlist/<ENTRY_ID> | jq